分类 技术分享 下的文章

默认用boost python包裹的C++对象是不支持pickle的,如果要用pickle.dumps(obj)的话那会提示错误

Pickling of "xxx" instances is not enabled.

这边吐槽一下在最新的代码里,给的reference链接其实还是不可用的。真正正确的是https://www.boost.org/doc/libs/1_74_0/libs/python/doc/html/reference/topics/pickle_support.html。

让你的class支持Pickle协议

若要让你的C++ Class支持Pickle协议,比较“正统”的方法是利用boost提供的boost::python::pickle_suite. 拿代码说话:

struct world_t {
    world_t(const string& country) { ... }
};

struct world_pickle_suite : boost::python::pickle_suite
  {
    static
    boost::python::tuple
    getinitargs(const world_t& w)
    {
      // [可选实现] 返回一个boost::python::tuple元组,其值被用来构造
      // 如果一个类的构造函数**不需要参数**的话,可以不用重载这个方法。
      return boost::python::make_tuple(w.country());
    }

    static
    boost::python::tuple
    getstate(const world_t& w)
    {
      // [可选实现] 如果对象的构造函数并不能完全恢复对象的状态,
      // 那么要用此函数返回其状态值
    }

    static
    void
    setstate(world_t& w, boost::python::tuple state)
    {
      // [可选实现] 如果对象的构造函数并不能完全恢复对象的状态,
      // 那么要用此函数把state里的状态恢复到w
    }
  };

boost::python::class_<world_t>("world", args<const std::string&>())
  // 定义world_t的boost python包装类
  .def_pickle(world_pickle_suite());
  // ...

需要注意的是,如果在suite里定义了getstate/setstate并且这个类的__dict__属性非空,boost是会报错的。一种可能的情况是你包装的类是一个子类,这时候还需要自己实现__getstate_manages_dict__属性。这边不赘述,可参考这里

原理简释

总所周知,Python3里若要在自定义的类里实现可Pickle,至少需要:

  • 它的__dict__属性是可pickle的,或;
  • 调用__getstate__()拿到的返回值是可pickle的,随后会调用__setstate__()方法在unpickle的时候恢复状态. 但在这些函数的背后,再稍微底层一点其实是通过__reduce__()方法实现的(更严格一点,它会先去寻找__reduce_ex__()是否可用)。这个方法简单来说就是返回一个tuple, 这个tuple有从构建对象到恢复状态所需的各种元素.

In fact, these methods are part of the copy protocol which implements the reduce() special method. -- https://docs.python.org/3/library/pickle.html

所以boost::python其实自己实现了一个对象的__reduce__()方法,在src/object/pickle_support.cpp里(源文件在这)。主要做了几件事

  1. 拿到当前对象的__class__属性(也就是class object);
  2. 检查对象是否有__safe_for_unpickling__属性。如果没有的话,就是本文最早提到的报错了;
  3. 检查对象是否有__getinitargs__()方法,若有则取值。没有的话,unpickle的时候按照无参数构造来处理;
  4. 检查对象是否有__getstate__()方法,若有则取值;
  5. 检查对象是否有__dict__属性,若有则会检查是否有__getstate_manages_dict__属性并获取这两个属性的值。
  6. 返回tuple(),内容依次为1-5里拿到的值。

可以看到,这个__reduce__()方法是遵循了Python里的object.__reduce__()协定的。当然了,如果某些使用者觉得继承一个suite类都觉得麻烦或者达不到自己的需求,也可以通过其他手段完成,只要记得自己整一个__reduce__()方法,只需满足Python的协定即可.

class_obj.attr("__reduce__") = boost::python::object(/*YOUR REDUCE FUNCTION*/);

再深一点~

如果你跟我一样无聊,那就来看看cpython自己想要__reduce__()的时候是怎么用到__getstate__()的吧.

cpython/Objects/typeobject.c这边有个函数叫

static PyObject* _common_reduce(PyObject *self, int proto)

,在pickle protocol>2时会调用static PyObject * reduce_newobj(PyObject *obj)方法,这个方法是Python里大多数默认对象的默认reduce逻辑。其中有一句

state = _PyObject_GetState(obj,
                !hasargs && !PyList_Check(obj) && !PyDict_Check(obj));

_PyObject_GetState里大致是从__dict__或者__slots__属性去获取对象的状态。

这边再往上一些还能看到有调用_PyObject_GetNewArguments()的,里面的逻辑就会去拿__getnewargs_ex__或是__getnewargs__属性的值作为__new__一个新对象时候的传入参数。这两个函数是Pickle协议文档里介绍的四个推荐覆盖的“高阶”函数之一。与之对应,Python并不希望一般人去自己写一个__reduce__()出来.

看到这里,再回想一下boost::python,pickle_suite里面的getinitargs()getstate()就分别对应到__getnewargs__()__getstate__()

题外话

迫于妹子生日要到了,今年手头又比较紧,所以打算做个微信公众号的小东西骗骗她😜 目前跑网站的服务器虽然配置不咋地,但是服务器上也就一个typecho的php项目要跑,平时访问量也门可罗雀,所以打算利用原有的服务器搭个反向代理到微信公众号的服务端。

目标

主域名example.com:

  • 80/443端口直接到原有的网站

子域名wechat.example.com

  • 80端口反向代理到本地服务http://localhost:8766/

实作

其实很简单,新建一个VirtualHost即可。 创建一个/etc/apache2/sites-available/wechat.conf内容如下:

<VirtualHost *:80>
        ServerName wechat.example.com

        ServerAdmin webmaster@localhost

        ProxyPass / http://localhost:8766/
        ProxyPassReverse / http://localhost:8766/

        ErrorLog ${APACHE_LOG_DIR}/error.log
        CustomLog ${APACHE_LOG_DIR}/access.log combined
</VirtualHost>

其中的关键是ProxyPass以及ProxyPassReverse.

完成之后,用a2ensite wechat启用这个VirtualHost, 再systemctl reload apache2便万事大吉。

背景

之前发现Jupyter Notebook下面,如果数据占用多的话,开多进程池会特别的慢。一开始以为是Python的锅,但是把multiprocessing.pool改成直接用os.fork()调用以后,问题依旧。照理来说unix下面使用fork开进程,会启用copy-on-write机制,内存增长并不是特别明显,但是实际在htop下面看内存仍然会在fork之后增长,并且和进程数量是线性相关的。

原因

随后想了老半天,想到了可能和页表有关系。查了一下,跑的服务器上huge page确实被禁用了(不知为何...).

fork的机制简单地说,是在创建新进程的时候把老的进程控制块(Process Control Block)里内存页表拷贝给了新的PCB——这边具体内存的信息是不拷贝的。由于当时Notebook跑的数据处理任务,里面已经用了不少内存(100GB+),所以拷贝的时候如果用默认的4KB内存页,将会有100 * 1024 * 1024 / 4 = 104,857,600个页表! 按典型一个页表项(Page Table Entry)大小4Bytes计算,一个进程开出来光页表会耗400MB内存.

特征(feature, X)与响应(outcome, y)之间的互信息(mutual information, MI)是一种衡量两个变量之间相关性的方法。该方法将相关性这一定义拓展到非线性的关系上。具体而言,它衡量了一个随机变量经由另一随机变量能得到的信息量。

MI的概念与信息熵(entropy)的概念密不可分。信息熵度量了一个随机变量携带的信息量。形式上,两个随机变量(X,Y)的互信息I(X,Y)定义如下: 连续形式 continuous mutual information

离散形式 discrete mutual information

sklern.feature_selection.mutual_info_regression方法实现了计算所有特征与一个连续输出之间互信息值的函数,可用于挑选最可能携带预测信息的特征。它也提供一个分类器版本。

本文全文翻译自Stefan Jansen's Hands-On Machine Learning for Algorithmic Trading*

为unix平台增加XZ多线程解压缩支持(基于7zip LZMA SDK, C/C++)

Note

This post has nothing to do with the pixz project. I am talking about decompressing the original xz archive using 7-zip's LZMA SDK under unix environment.

Background

Originally the 7zip's LZMA SDK (version 19.00) only covers parallel xz decompression for Windows systems. This post shows the C code that adds support for lib pthread, i.e. unix systems. Compiling with C++ using the C library should also work, I have tested it on my own box.

Little Details

Actually the original writer has completed all the necessary abstraction for the multi-threading pipeline. All I have done is adding some macros and pthread equivalents to Windows threading model.

Usage

Replace lzma/C/... with below files. The new code should work on both Windows and Unix systems.

Git Repo

See https://github.com/idailylife/lzma-xz-parallel

Source Code

Threads.h

/* Threads.h -- multithreading library
2017-06-18 : Igor Pavlov : Public domain */

#ifndef __7Z_THREADS_H
#define __7Z_THREADS_H

#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif

#include "7zTypes.h"

EXTERN_C_BEGIN

WRes HandlePtr_Close(HANDLE *h);
WRes Handle_WaitObject(HANDLE h);

#ifdef _WIN32
typedef HANDLE CThread;
#define Thread_Construct(p) *(p) = NULL
#define Thread_WasCreated(p) (*(p) != NULL)
#define Thread_Close(p) HandlePtr_Close(p)
#define Thread_Wait(p) Handle_WaitObject(*(p))
#else
typedef void* LPVOID;
typedef pthread_t* CThread;
#define Thread_Construct(p) *(p) = NULL
#define Thread_WasCreated(p) (*(p) != NULL)
#define Thread_Close(p) HandleThread_Close(*(p))
#define Thread_Wait(p) HandleThread_Join(*(p))
WRes HandleThread_Close(pthread_t* th);
WRes HandleThread_Join(pthread_t* th);

#endif

typedef
#ifdef UNDER_CE
  DWORD
#else
  unsigned
#endif
  THREAD_FUNC_RET_TYPE;

#define THREAD_FUNC_CALL_TYPE MY_STD_CALL
#define THREAD_FUNC_DECL THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
typedef THREAD_FUNC_RET_TYPE (THREAD_FUNC_CALL_TYPE * THREAD_FUNC_TYPE)(void *);
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param);

#ifdef _WIN32

typedef HANDLE CEvent;
typedef CEvent CAutoResetEvent;
typedef CEvent CManualResetEvent;
#define Event_Construct(p) *(p) = NULL
#define Event_IsCreated(p) (*(p) != NULL)
#define Event_Close(p) HandlePtr_Close(p)
#define Event_Wait(p) Handle_WaitObject(*(p))
WRes Event_Set(CEvent *p);
WRes Event_Reset(CEvent *p);
WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled); // not used
WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p); // not used
WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled);
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p);

#else
typedef struct {
  bool state;
  pthread_mutex_t mutex;
  pthread_cond_t cond;
} event_t;

typedef event_t* CEvent;
typedef CEvent CAutoResetEvent;
#define Event_Construct(p) *(p) = NULL
#define Event_IsCreated(p) (*(p) != NULL)

WRes Event_Close(CEvent* p);
WRes Event_Set(CEvent *p);
WRes Event_Reset(CEvent *p);
WRes Event_Wait(CEvent* p);
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent* p);

#endif

// CSemaphore is not used for decoding
#ifdef _WIN32
typedef HANDLE CSemaphore;
#define Semaphore_Construct(p) *(p) = NULL
#define Semaphore_IsCreated(p) (*(p) != NULL)
#define Semaphore_Close(p) HandlePtr_Close(p)
#define Semaphore_Wait(p) Handle_WaitObject(*(p))
WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount);
WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num);
WRes Semaphore_Release1(CSemaphore *p);
#endif

#ifdef _WIN32

typedef CRITICAL_SECTION CCriticalSection;
WRes CriticalSection_Init(CCriticalSection *p);
#define CriticalSection_Delete(p) DeleteCriticalSection(p)
#define CriticalSection_Enter(p) EnterCriticalSection(p)
#define CriticalSection_Leave(p) LeaveCriticalSection(p)

#else
/// use mutex instead
typedef pthread_mutex_t* CCriticalSection
WRes CriticalSection_Init(CCriticalSection *p);
WRes CriticalSection_Delete(CCriticalSection *p);
WRes CriticalSection_Enter(CCriticalSection *p);
WRes CriticalSection_Leave(CCriticalSection *p);

#endif
EXTERN_C_END

#endif

Threads.c

/* Threads.c -- multithreading library
2017-06-26 : Igor Pavlov : Public domain */

#include "Precomp.h"

#ifdef _WIN32
  #ifndef UNDER_CE
  #include <process.h>
  #endif
#endif

#include "Threads.h"

#ifdef _WIN32
static WRes GetError()
{
  DWORD res = GetLastError();
  return res ? (WRes)res : 1;
}

static WRes HandleToWRes(HANDLE h) { return (h != NULL) ? 0 : GetError(); }
static WRes BOOLToWRes(BOOL v) { return v ? 0 : GetError(); }

WRes HandlePtr_Close(HANDLE *p)
{
  if (*p != NULL)
  {
    if (!CloseHandle(*p))
      return GetError();
    *p = NULL;
  }
  return 0;
}

WRes Handle_WaitObject(HANDLE h) { return (WRes)WaitForSingleObject(h, INFINITE); }

#else
/// unix specific functions

WRes HandleThread_Close(pthread_t* th) {
  free(th);
  th = NULL;
  return 0;
}

WRes HandleThread_Join(pthread_t* th) {
  return pthread_join(*th, NULL);
}

#endif

#ifdef _WIN32
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
{
  /* Windows Me/98/95: threadId parameter may not be NULL in _beginthreadex/CreateThread functions */

  #ifdef UNDER_CE

  DWORD threadId;
  *p = CreateThread(0, 0, func, param, 0, &threadId);

  #else

  unsigned threadId;
  *p = (HANDLE)_beginthreadex(NULL, 0, func, param, 0, &threadId);

  #endif

  /* maybe we must use errno here, but probably GetLastError() is also OK. */
  return HandleToWRes(*p);
}
#else
pthread_attr_t g_th_attrs[64]; //NOTE: maximum of 64 threads
size_t g_th_index = 0;

WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
{
  *p = malloc(sizeof(pthread_t));
  pthread_t* th = *p;
  int ret = pthread_attr_init(&(g_th_attrs[g_th_index]));
  assert(ret==0);
  ret = pthread_create(th, &(g_th_attrs[g_th_index]), func, param);
  g_th_index++;
  return ret;
}
#endif

#ifdef _WIN32
static WRes Event_Create(CEvent *p, BOOL manualReset, int signaled)
{
  *p = CreateEvent(NULL, manualReset, (signaled ? TRUE : FALSE), NULL);
  return HandleToWRes(*p);
}

WRes Event_Set(CEvent *p) { return BOOLToWRes(SetEvent(*p)); }
WRes Event_Reset(CEvent *p) { return BOOLToWRes(ResetEvent(*p)); }

WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled) { return Event_Create(p, TRUE, signaled); }
WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled) { return Event_Create(p, FALSE, signaled); }
WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p) { return ManualResetEvent_Create(p, 0); }
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) { return AutoResetEvent_Create(p, 0); }

#else 
///unix

WRes Event_Close(CEvent* p) {
  if (!p || !(*p))
    return 0;
  event_t* evt = *p;
  pthread_cond_destroy(&evt->cond);
  pthread_mutex_destroy(&evt->mutex);
  free(evt);
  *p = NULL;
}

WRes Event_Set(CEvent *p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex) != 0) {
    return 1;
  }
  evt->state = true;

  if (evt->manual_reset) {
    if (pthread_cond_broadcast(&evt->cond)) {
      return 1;
    }
  } else {
    if (pthread_cond_signal(&evn->cond)) {
      return 1;
    }
  }

  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }

  return 0;
}

WRes Event_Reset(CEvent* p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }

  evt->state = false;

  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }

  return 0;
}

WRes Event_Wait(CEvent* p) {
  event_t* evt = *p;
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }

  while (!evt->state) {
    if (pthread_cond_wait(&evt->cond, &evt->mutex)) {
      pthread_mutex_unlock(&evt->mutex);
      return 1;
    }
  }

  evt->state = false;
  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }
  return 0;
}

WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) {
  *p = malloc(sizeof(event_t));
  memset(evt, 0, sizeof(event_t));
  evt->state = false;
  evt->manual_reset = false;
  if (pthread_mutex_init(&evt->mutex, NULL)) {
    return 1;
  }
  if (pthread_cond_init(&evt->cond, NULL)) {
    return 1;
  }
  return 0;
}

#endif

#ifdef _WIN32

WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
{
  *p = CreateSemaphore(NULL, (LONG)initCount, (LONG)maxCount, NULL);
  return HandleToWRes(*p);
}

static WRes Semaphore_Release(CSemaphore *p, LONG releaseCount, LONG *previousCount)
  { return BOOLToWRes(ReleaseSemaphore(*p, releaseCount, previousCount)); }
WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num)
  { return Semaphore_Release(p, (LONG)num, NULL); }
WRes Semaphore_Release1(CSemaphore *p) { return Semaphore_ReleaseN(p, 1); }

#endif

#ifdef _WIN32
WRes CriticalSection_Init(CCriticalSection *p)
{
  /* InitializeCriticalSection can raise only STATUS_NO_MEMORY exception */
  #ifdef _MSC_VER
  __try
  #endif
  {
    InitializeCriticalSection(p);
    /* InitializeCriticalSectionAndSpinCount(p, 0); */
  }
  #ifdef _MSC_VER
  __except (EXCEPTION_EXECUTE_HANDLER) { return 1; }
  #endif
  return 0;
}

#else
WRes CriticalSection_Init(CCriticalSection *p) {
  *p = malloc(sizeof(pthread_mutex_t));
  if (pthread_mutex_init(*p, NULL)) {
    return 1;
  }
  return 0;
}

WRes CriticalSection_Delete(CCriticalSection *p) {
  pthread_mutex_t* mtx = *p;
  return pthread_mutex_destroy(mtx);
}

WRes CriticalSection_Enter(CCriticalSection *p) {
  if (pthread_mutex_lock(&evt->mutex)) {
    return 1;
  }
  return 0;
}

WRes CriticalSection_Leave(CCriticalSection *p) {
  if (pthread_mutex_unlock(&evt->mutex)) {
    return 1;
  }
  return 0;
}

#endif

XzDec.c

add the following macro

#ifndef _WIN32
#define S_OK 0x00000000
#define E_FAIL 0x80004005
#endif

MtDec.c

replace function ThreadFUnc1 with:

static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void* pp)
{
  static int g_ok_stat = 0x0;
  static int g_err_stat = 0x80004005;
  WRes res;
  CMtDecThread *t = (CMtDecThread*)pp;
  CMtDec *p;

  res = ThreadFunc2(t);
  p = t->mtDec;
  if (res == 0) {
#ifdef _WIN32
    return p->exitThreadWRes;
#else
    if (p->exitThreadWRes) { return &g_err_stat; }
    else { return &g_ok_stat; }
#endif
  }
  {
    // it's unexpected situation for some threading function error
    if (p->exitThreadWRes == 0)
      p->exitThreadWRes = res;
    PRF(printf("\nthread exit error = %d\n", res));
    p->exitThread = True;
    Event_Set(&p->threads[0].canRead);
    Event_Set(&p->threads[0].canWrite);
    MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
  }
#ifdef _WIN32
    return res;
#else
    return &g_err_stat;
#endif
}