分类 C++ 下的文章

默认用boost python包裹的C++对象是不支持pickle的,如果要用pickle.dumps(obj)的话那会提示错误

Pickling of "xxx" instances is not enabled.

这边吐槽一下在最新的代码里,给的reference链接其实还是不可用的。真正正确的是https://www.boost.org/doc/libs/1_74_0/libs/python/doc/html/reference/topics/pickle_support.html。

让你的class支持Pickle协议

若要让你的C++ Class支持Pickle协议,比较“正统”的方法是利用boost提供的boost::python::pickle_suite. 拿代码说话:

struct world_t {
    world_t(const string& country) { ... }
};

struct world_pickle_suite : boost::python::pickle_suite
  {
    static
    boost::python::tuple
    getinitargs(const world_t& w)
    {
      // [可选实现] 返回一个boost::python::tuple元组,其值被用来构造
      // 如果一个类的构造函数**不需要参数**的话,可以不用重载这个方法。
      return boost::python::make_tuple(w.country());
    }

    static
    boost::python::tuple
    getstate(const world_t& w)
    {
      // [可选实现] 如果对象的构造函数并不能完全恢复对象的状态,
      // 那么要用此函数返回其状态值
    }

    static
    void
    setstate(world_t& w, boost::python::tuple state)
    {
      // [可选实现] 如果对象的构造函数并不能完全恢复对象的状态,
      // 那么要用此函数把state里的状态恢复到w
    }
  };

boost::python::class_<world_t>("world", args<const std::string&>())
  // 定义world_t的boost python包装类
  .def_pickle(world_pickle_suite());
  // ...

需要注意的是,如果在suite里定义了getstate/setstate并且这个类的__dict__属性非空,boost是会报错的。一种可能的情况是你包装的类是一个子类,这时候还需要自己实现__getstate_manages_dict__属性。这边不赘述,可参考这里

原理简释

总所周知,Python3里若要在自定义的类里实现可Pickle,至少需要:

  • 它的__dict__属性是可pickle的,或;
  • 调用__getstate__()拿到的返回值是可pickle的,随后会调用__setstate__()方法在unpickle的时候恢复状态. 但在这些函数的背后,再稍微底层一点其实是通过__reduce__()方法实现的(更严格一点,它会先去寻找__reduce_ex__()是否可用)。这个方法简单来说就是返回一个tuple, 这个tuple有从构建对象到恢复状态所需的各种元素.

In fact, these methods are part of the copy protocol which implements the reduce() special method. -- https://docs.python.org/3/library/pickle.html

所以boost::python其实自己实现了一个对象的__reduce__()方法,在src/object/pickle_support.cpp里(源文件在这)。主要做了几件事

  1. 拿到当前对象的__class__属性(也就是class object);
  2. 检查对象是否有__safe_for_unpickling__属性。如果没有的话,就是本文最早提到的报错了;
  3. 检查对象是否有__getinitargs__()方法,若有则取值。没有的话,unpickle的时候按照无参数构造来处理;
  4. 检查对象是否有__getstate__()方法,若有则取值;
  5. 检查对象是否有__dict__属性,若有则会检查是否有__getstate_manages_dict__属性并获取这两个属性的值。
  6. 返回tuple(),内容依次为1-5里拿到的值。

可以看到,这个__reduce__()方法是遵循了Python里的object.__reduce__()协定的。当然了,如果某些使用者觉得继承一个suite类都觉得麻烦或者达不到自己的需求,也可以通过其他手段完成,只要记得自己整一个__reduce__()方法,只需满足Python的协定即可.

class_obj.attr("__reduce__") = boost::python::object(/*YOUR REDUCE FUNCTION*/);

再深一点~

如果你跟我一样无聊,那就来看看cpython自己想要__reduce__()的时候是怎么用到__getstate__()的吧.

cpython/Objects/typeobject.c这边有个函数叫

static PyObject* _common_reduce(PyObject *self, int proto)

,在pickle protocol>2时会调用static PyObject * reduce_newobj(PyObject *obj)方法,这个方法是Python里大多数默认对象的默认reduce逻辑。其中有一句

state = _PyObject_GetState(obj,
                !hasargs && !PyList_Check(obj) && !PyDict_Check(obj));

_PyObject_GetState里大致是从__dict__或者__slots__属性去获取对象的状态。

这边再往上一些还能看到有调用_PyObject_GetNewArguments()的,里面的逻辑就会去拿__getnewargs_ex__或是__getnewargs__属性的值作为__new__一个新对象时候的传入参数。这两个函数是Pickle协议文档里介绍的四个推荐覆盖的“高阶”函数之一。与之对应,Python并不希望一般人去自己写一个__reduce__()出来.

看到这里,再回想一下boost::python,pickle_suite里面的getinitargs()getstate()就分别对应到__getnewargs__()__getstate__()

背景

之前发现Jupyter Notebook下面,如果数据占用多的话,开多进程池会特别的慢。一开始以为是Python的锅,但是把multiprocessing.pool改成直接用os.fork()调用以后,问题依旧。照理来说unix下面使用fork开进程,会启用copy-on-write机制,内存增长并不是特别明显,但是实际在htop下面看内存仍然会在fork之后增长,并且和进程数量是线性相关的。

原因

随后想了老半天,想到了可能和页表有关系。查了一下,跑的服务器上huge page确实被禁用了(不知为何...).

fork的机制简单地说,是在创建新进程的时候把老的进程控制块(Process Control Block)里内存页表拷贝给了新的PCB——这边具体内存的信息是不拷贝的。由于当时Notebook跑的数据处理任务,里面已经用了不少内存(100GB+),所以拷贝的时候如果用默认的4KB内存页,将会有100 * 1024 * 1024 / 4 = 104,857,600个页表! 按典型一个页表项(Page Table Entry)大小4Bytes计算,一个进程开出来光页表会耗400MB内存.

为unix平台增加XZ多线程解压缩支持(基于7zip LZMA SDK, C/C++)

Note

This post has nothing to do with the pixz project. I am talking about decompressing the original xz archive using 7-zip's LZMA SDK under unix environment.

Background

Originally the 7zip's LZMA SDK (version 19.00) only covers parallel xz decompression for Windows systems. This post shows the C code that adds support for lib pthread, i.e. unix systems. Compiling with C++ using the C library should also work, I have tested it on my own box.

Little Details

Actually the original writer has completed all the necessary abstraction for the multi-threading pipeline. All I have done is adding some macros and pthread equivalents to Windows threading model.

Usage

Replace lzma/C/... with below files. The new code should work on both Windows and Unix systems.

Git Repo

See https://github.com/idailylife/lzma-xz-parallel

Source Code

Threads.h

/* Threads.h -- multithreading library
2017-06-18 : Igor Pavlov : Public domain */

#ifndef __7Z_THREADS_H
#define __7Z_THREADS_H

#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif

#include "7zTypes.h"

EXTERN_C_BEGIN

WRes HandlePtr_Close(HANDLE *h);
WRes Handle_WaitObject(HANDLE h);

#ifdef _WIN32
typedef HANDLE CThread;
#define Thread_Construct(p) *(p) = NULL
#define Thread_WasCreated(p) (*(p) != NULL)
#define Thread_Close(p) HandlePtr_Close(p)
#define Thread_Wait(p) Handle_WaitObject(*(p))
#else
typedef void* LPVOID;
typedef pthread_t* CThread;
#define Thread_Construct(p) *(p) = NULL
#define Thread_WasCreated(p) (*(p) != NULL)
#define Thread_Close(p) HandleThread_Close(*(p))
#define Thread_Wait(p) HandleThread_Join(*(p))
WRes HandleThread_Close(pthread_t* th);
WRes HandleThread_Join(pthread_t* th);

#endif

typedef
#ifdef UNDER_CE
  DWORD
#else
  unsigned
#endif
  THREAD_FUNC_RET_TYPE;

#define THREAD_FUNC_CALL_TYPE MY_STD_CALL
#define THREAD_FUNC_DECL THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
typedef THREAD_FUNC_RET_TYPE (THREAD_FUNC_CALL_TYPE * THREAD_FUNC_TYPE)(void *);
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param);

#ifdef _WIN32

typedef HANDLE CEvent;
typedef CEvent CAutoResetEvent;
typedef CEvent CManualResetEvent;
#define Event_Construct(p) *(p) = NULL
#define Event_IsCreated(p) (*(p) != NULL)
#define Event_Close(p) HandlePtr_Close(p)
#define Event_Wait(p) Handle_WaitObject(*(p))
WRes Event_Set(CEvent *p);
WRes Event_Reset(CEvent *p);
WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled); // not used
WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p); // not used
WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled);
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p);

#else
typedef struct {
  bool state;
  pthread_mutex_t mutex;
  pthread_cond_t cond;
} event_t;

typedef event_t* CEvent;
typedef CEvent CAutoResetEvent;
#define Event_Construct(p) *(p) = NULL
#define Event_IsCreated(p) (*(p) != NULL)

WRes Event_Close(CEvent* p);
WRes Event_Set(CEvent *p);
WRes Event_Reset(CEvent *p);
WRes Event_Wait(CEvent* p);
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent* p);

#endif

// CSemaphore is not used for decoding
#ifdef _WIN32
typedef HANDLE CSemaphore;
#define Semaphore_Construct(p) *(p) = NULL
#define Semaphore_IsCreated(p) (*(p) != NULL)
#define Semaphore_Close(p) HandlePtr_Close(p)
#define Semaphore_Wait(p) Handle_WaitObject(*(p))
WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount);
WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num);
WRes Semaphore_Release1(CSemaphore *p);
#endif

#ifdef _WIN32

typedef CRITICAL_SECTION CCriticalSection;
WRes CriticalSection_Init(CCriticalSection *p);
#define CriticalSection_Delete(p) DeleteCriticalSection(p)
#define CriticalSection_Enter(p) EnterCriticalSection(p)
#define CriticalSection_Leave(p) LeaveCriticalSection(p)

#else
/// use mutex instead
typedef pthread_mutex_t* CCriticalSection
WRes CriticalSection_Init(CCriticalSection *p);
WRes CriticalSection_Delete(CCriticalSection *p);
WRes CriticalSection_Enter(CCriticalSection *p);
WRes CriticalSection_Leave(CCriticalSection *p);

#endif
EXTERN_C_END

#endif

Threads.c

/* Threads.c -- multithreading library
2017-06-26 : Igor Pavlov : Public domain */

#include "Precomp.h"

#ifdef _WIN32
  #ifndef UNDER_CE
  #include <process.h>
  #endif
#endif

#include "Threads.h"

#ifdef _WIN32
static WRes GetError()
{
  DWORD res = GetLastError();
  return res ? (WRes)res : 1;
}

static WRes HandleToWRes(HANDLE h) { return (h != NULL) ? 0 : GetError(); }
static WRes BOOLToWRes(BOOL v) { return v ? 0 : GetError(); }

WRes HandlePtr_Close(HANDLE *p)
{
  if (*p != NULL)
  {
    if (!CloseHandle(*p))
      return GetError();
    *p = NULL;
  }
  return 0;
}

WRes Handle_WaitObject(HANDLE h) { return (WRes)WaitForSingleObject(h, INFINITE); }

#else
/// unix specific functions

WRes HandleThread_Close(pthread_t* th) {
  free(th);
  th = NULL;
  return 0;
}

WRes HandleThread_Join(pthread_t* th) {
  return pthread_join(*th, NULL);
}

#endif

#ifdef _WIN32
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
{
  /* Windows Me/98/95: threadId parameter may not be NULL in _beginthreadex/CreateThread functions */

  #ifdef UNDER_CE

  DWORD threadId;
  *p = CreateThread(0, 0, func, param, 0, &threadId);

  #else

  unsigned threadId;
  *p = (HANDLE)_beginthreadex(NULL, 0, func, param, 0, &threadId);

  #endif

  /* maybe we must use errno here, but probably GetLastError() is also OK. */
  return HandleToWRes(*p);
}
#else
pthread_attr_t g_th_attrs[64]; //NOTE: maximum of 64 threads
size_t g_th_index = 0;

WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
{
  *p = malloc(sizeof(pthread_t));
  pthread_t* th = *p;
  int ret = pthread_attr_init(&(g_th_attrs[g_th_index]));
  assert(ret==0);
  ret = pthread_create(th, &(g_th_attrs[g_th_index]), func, param);
  g_th_index++;
  return ret;
}
#endif

#ifdef _WIN32
static WRes Event_Create(CEvent *p, BOOL manualReset, int signaled)
{
  *p = CreateEvent(NULL, manualReset, (signaled ? TRUE : FALSE), NULL);
  return HandleToWRes(*p);
}

WRes Event_Set(CEvent *p) { return BOOLToWRes(SetEvent(*p)); }
WRes Event_Reset(CEvent *p) { return BOOLToWRes(ResetEvent(*p)); }

WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled) { return Event_Create(p, TRUE, signaled); }
WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled) { return Event_Create(p, FALSE, signaled); }
WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p) { return ManualResetEvent_Create(p, 0); }
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) { return AutoResetEvent_Create(p, 0); }

#else 
///unix

WRes Event_Close(CEvent* p) {
  if (!p || !(*p))
    return 0;
  event_t* evt = *p;
  pthread_cond_destroy(&evt->cond);
  pthread_mutex_destroy(&evt->mutex);
  free(evt);
  *p = NULL;
}

WRes Event_Set(CEvent *p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex) != 0) {
    return 1;
  }
  evt->state = true;

  if (evt->manual_reset) {
    if (pthread_cond_broadcast(&evt->cond)) {
      return 1;
    }
  } else {
    if (pthread_cond_signal(&evn->cond)) {
      return 1;
    }
  }

  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }

  return 0;
}

WRes Event_Reset(CEvent* p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }

  evt->state = false;

  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }

  return 0;
}

WRes Event_Wait(CEvent* p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }

  while (!evt->state) {
    if (pthread_cond_wait(&evt->cond, &evt->mutex)) {
      pthread_mutex_unlock(&evt->mutex);
      return 1;
    }
  }

  evt->state = false;
  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }
  return 0;
}

WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) {
  *p = malloc(sizeof(event_t));
  memset(evt, 0, sizeof(event_t));
  evt->state = false;
  evt->manual_reset = false;
  if (pthread_mutex_init(&evt->mutex, NULL)) {
    return 1;
  }
  if (pthread_cond_init(&evt->cond, NULL)) {
    return 1;
  }
  return 0;
}

#endif

#ifdef _WIN32

WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
{
  *p = CreateSemaphore(NULL, (LONG)initCount, (LONG)maxCount, NULL);
  return HandleToWRes(*p);
}

static WRes Semaphore_Release(CSemaphore *p, LONG releaseCount, LONG *previousCount)
  { return BOOLToWRes(ReleaseSemaphore(*p, releaseCount, previousCount)); }
WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num)
  { return Semaphore_Release(p, (LONG)num, NULL); }
WRes Semaphore_Release1(CSemaphore *p) { return Semaphore_ReleaseN(p, 1); }

#endif

#ifdef _WIN32
WRes CriticalSection_Init(CCriticalSection *p)
{
  /* InitializeCriticalSection can raise only STATUS_NO_MEMORY exception */
  #ifdef _MSC_VER
  __try
  #endif
  {
    InitializeCriticalSection(p);
    /* InitializeCriticalSectionAndSpinCount(p, 0); */
  }
  #ifdef _MSC_VER
  __except (EXCEPTION_EXECUTE_HANDLER) { return 1; }
  #endif
  return 0;
}

#else
WRes CriticalSection_Init(CCriticalSection *p) {
  *p = malloc(sizeof(pthread_mutex_t));
  if (pthread_mutex_init(*p, NULL)) {
    return 1;
  }
  return 0;
}

WRes CriticalSection_Delete(CCriticalSection *p) {
  pthread_mutex_t* mtx = *p;
  return pthread_mutex_destroy(mtx);
}

WRes CriticalSection_Enter(CCriticalSection *p) {
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }
  return 0;
}

WRes CriticalSection_Leave(CCriticalSection *p) {
  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }
  return 0;
}

#endif

XzDec.c

add the following macro

#ifndef _WIN32
#define S_OK 0x00000000
#define E_FAIL 0x80004005
#endif

MtDec.c

replace function ThreadFUnc1 with:

static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void* pp)
{
  static int g_ok_stat = 0x0;
  static int g_err_stat = 0x80004005;
  WRes res;
  CMtDecThread *t = (CMtDecThread*)pp;
  CMtDec *p;

  res = ThreadFunc2(t);
  p = t->mtDec;
  if (res == 0) {
#ifdef _WIN32
    return p->exitThreadWRes;
#else
    if (p->exitThreadWRes) { return &g_err_stat; }
    else { return &g_ok_stat; }
#endif
  }
  {
    // it's unexpected situation for some threading function error
    if (p->exitThreadWRes == 0)
      p->exitThreadWRes = res;
    PRF(printf("\nthread exit error = %d\n", res));
    p->exitThread = True;
    Event_Set(&p->threads[0].canRead);
    Event_Set(&p->threads[0].canWrite);
    MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
  }
#ifdef _WIN32
    return res;
#else
    return &g_err_stat;
#endif
}

C++11有了一些关于线程的模型,在此之前C++里可是各自为政的,各种线程库各种神奇用法。其中有两个好玩的东西就是std::promise<T>std::future<T>,下文书中形容它们像是个“虫洞”。

  • std::future是虫洞的出口:一个未来对象的“获取器”,在未来的某一刻它能返回一个有用的值,但现在还没有...
  • std::promise是虫洞的入口:我们要保证在未来的某一刻给一个值进去,但不是现在...

Basic usage

一般来说这俩是成对使用的,看个代码:

#include <cassert>
#include <chrono>
#include <future>
#include <iostream>
#include <string>
#include <thread>

using namespace std::chrono_literals;

int main()
{
  std::promise<int> p1, p2;
  std::future<int> f1 = p1.get_future();
  std::future<int> f2 = p2.get_future();

  p1.set_value(42);
  assert(f1.get() == 42);

  std::thread t([&]() {
    std::this_thread::sleep_for(100ms);
    p2.set_value(43);
  });

  auto start_time = std::chrono::system_clock::now();
  assert(f2.get() == 43);
  std::chrono::duration<double, std::milli> elapsed = std::chrono::system_clock::now() - start_time;
  std::cout << "Waited " << elapsed.count() << " ms\n";
  t.join();
  return 0;
}

// output: Waited 100.253 ms                                                                                                                                                       

一个future对象可以通过promise.get_future()方法创建出来。当我们有真正的值来填进promose对象的时候,就用promise.set_value(v)方法。同时,(一般是在另一个线程里)当准备要获取值的时候,就调用future.get()方法。get方法会保持阻塞状态直到一个有效值被填进去。

值得一提的是,有一个特化的T = void。这货有啥用呢?可以用来阻塞等待某个并行任务完成:

std::promise<void> ready_p;
std::future<void> read_f = ready_p.get_future();

std::thread thread_b([&]() {
    prep_work();
    ready_p.set_value(); // no arg
    main_work();
});
ready_f.wait();
// now that thread B has completed

A bit of details

需要留意的是,promise也好future也罢,都是有动态内存分配(dynamic memory allocation)的开销的。 std_promise_future_schema.PNG(图源为参考文献)

留意图中的那个State对象,它基本上是一个shared_ptr——因为虫洞的两端(很可能是不同线程)都要用到这个共享对象(shared ownership)。所以创建std::promose/std::future的时候都是要申请新的堆空间。

Reference

本文全文参考自本书: <Mastering the C++17 STL: Make Full Use of the Standard Library Components in C++17>

std::variant应当是用来替代union来使用的,后者从C时代就有了,但是它缺乏类型安全的保障。boost库很早之前就有了实现(其实从C++11开始,感觉boost不知道多少东西被借鉴走了)。 std::variant的类长这个样子:

template <class... Types>
class variant;

Usage

使用起来也是比较方便的

std::variant<int, double> v1;
v1 = 1; // activate the "int" member
assert(v1.index() == 0);
assert(std::get<0>(v1) == 1);

v1 = 3.14; // activate the "double" member
assert(v1.index() == 1);
assert(std::get<1>(v1) == 3.14);
assert(std::get<double>(v1) == 3.14);

assert(std::holds_alternative<int>(v1) == false);
assert(std::holds_alternative<double>(v1) == true);

assert(std::get_if<int>(&v1) == nullptr);
assert(*std::get_if<double>(&v1) == 3.14);

上面代码里的几个好用的constexpr函数:

  • std::get<>:可以传入index或是type。如果index或者类型不对的话,会跑出std::bad_variant_access异常;
  • std::get_if<>:这个是不抛异常的版本。注意参数和返回值类型都是指针——如果没有有效元素,那就会返回nullptr;
  • std::holds_alternative<Type>:判断一个Type在不在定义的variant参数类里面;

Visiting variants

如果要根据当前存储的类型来对variant对象做一些操作,第一会想到写一些if语句

if (std::holds_alternative<int>(v1) == true) {
  // handle int value
}
else {
  // handle double value
}

但是如果variant的可选类型比较多的话,定义一个Visitor类会有助于按类型来执行操作,逻辑会显得清晰

struct Visitor {
  double operator() (double d) { return d; }
  double operator() (int i) { return double(i); }
};

void show(std::variant<double, int> v) {
  std::cout >> std::visit(Visitor{}, v) << std::endl;
}

void test() {
  show(3.14);
  show(1);
}