为unix平台增加XZ多线程解压缩支持(基于7zip LZMA SDK, C/C++)
Note
This post has nothing to do with the pixz project. I am talking about decompressing the original xz archive using 7-zip’s LZMA SDK under unix environment.
Background
Originally the 7zip’s LZMA SDK (version 19.00) only covers parallel xz decompression for Windows systems. This post shows the C code that adds support for lib pthread, i.e. unix systems.
Compiling with C++ using the C library should also work, I have tested it on my own box.
Little Details
Actually the original writer has completed all the necessary abstraction for the multi-threading pipeline. All I have done is adding some macros and pthread equivalents to Windows threading model.
Usage
Replace lzma/C/...
with below files. The new code should work on both Windows and Unix systems.
Git Repo
See https://github.com/idailylife/lzma-xz-parallel
Source Code
Threads.h
/* Threads.h -- multithreading library
2017-06-18 : Igor Pavlov : Public domain */
#ifndef __7Z_THREADS_H
#define __7Z_THREADS_H
#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif
#include "7zTypes.h"
EXTERN_C_BEGIN
WRes HandlePtr_Close(HANDLE *h);
WRes Handle_WaitObject(HANDLE h);
#ifdef _WIN32
typedef HANDLE CThread;
#define Thread_Construct(p) *(p) = NULL
#define Thread_WasCreated(p) (*(p) != NULL)
#define Thread_Close(p) HandlePtr_Close(p)
#define Thread_Wait(p) Handle_WaitObject(*(p))
#else
typedef void* LPVOID;
typedef pthread_t* CThread;
#define Thread_Construct(p) *(p) = NULL
#define Thread_WasCreated(p) (*(p) != NULL)
#define Thread_Close(p) HandleThread_Close(*(p))
#define Thread_Wait(p) HandleThread_Join(*(p))
WRes HandleThread_Close(pthread_t* th);
WRes HandleThread_Join(pthread_t* th);
#endif
typedef
#ifdef UNDER_CE
DWORD
#else
unsigned
#endif
THREAD_FUNC_RET_TYPE;
#define THREAD_FUNC_CALL_TYPE MY_STD_CALL
#define THREAD_FUNC_DECL THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
typedef THREAD_FUNC_RET_TYPE (THREAD_FUNC_CALL_TYPE * THREAD_FUNC_TYPE)(void *);
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param);
#ifdef _WIN32
typedef HANDLE CEvent;
typedef CEvent CAutoResetEvent;
typedef CEvent CManualResetEvent;
#define Event_Construct(p) *(p) = NULL
#define Event_IsCreated(p) (*(p) != NULL)
#define Event_Close(p) HandlePtr_Close(p)
#define Event_Wait(p) Handle_WaitObject(*(p))
WRes Event_Set(CEvent *p);
WRes Event_Reset(CEvent *p);
WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled); // not used
WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p); // not used
WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled);
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p);
#else
typedef struct {
bool state;
pthread_mutex_t mutex;
pthread_cond_t cond;
} event_t;
typedef event_t* CEvent;
typedef CEvent CAutoResetEvent;
#define Event_Construct(p) *(p) = NULL
#define Event_IsCreated(p) (*(p) != NULL)
WRes Event_Close(CEvent* p);
WRes Event_Set(CEvent *p);
WRes Event_Reset(CEvent *p);
WRes Event_Wait(CEvent* p);
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent* p);
#endif
// CSemaphore is not used for decoding
#ifdef _WIN32
typedef HANDLE CSemaphore;
#define Semaphore_Construct(p) *(p) = NULL
#define Semaphore_IsCreated(p) (*(p) != NULL)
#define Semaphore_Close(p) HandlePtr_Close(p)
#define Semaphore_Wait(p) Handle_WaitObject(*(p))
WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount);
WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num);
WRes Semaphore_Release1(CSemaphore *p);
#endif
#ifdef _WIN32
typedef CRITICAL_SECTION CCriticalSection;
WRes CriticalSection_Init(CCriticalSection *p);
#define CriticalSection_Delete(p) DeleteCriticalSection(p)
#define CriticalSection_Enter(p) EnterCriticalSection(p)
#define CriticalSection_Leave(p) LeaveCriticalSection(p)
#else
/// use mutex instead
typedef pthread_mutex_t* CCriticalSection
WRes CriticalSection_Init(CCriticalSection *p);
WRes CriticalSection_Delete(CCriticalSection *p);
WRes CriticalSection_Enter(CCriticalSection *p);
WRes CriticalSection_Leave(CCriticalSection *p);
#endif
EXTERN_C_END
#endif
Threads.c
/* Threads.c -- multithreading library
2017-06-26 : Igor Pavlov : Public domain */
#include "Precomp.h"
#ifdef _WIN32
#ifndef UNDER_CE
#include <process.h>
#endif
#endif
#include "Threads.h"
#ifdef _WIN32
static WRes GetError()
{
DWORD res = GetLastError();
return res ? (WRes)res : 1;
}
static WRes HandleToWRes(HANDLE h) { return (h != NULL) ? 0 : GetError(); }
static WRes BOOLToWRes(BOOL v) { return v ? 0 : GetError(); }
WRes HandlePtr_Close(HANDLE *p)
{
if (*p != NULL)
{
if (!CloseHandle(*p))
return GetError();
*p = NULL;
}
return 0;
}
WRes Handle_WaitObject(HANDLE h) { return (WRes)WaitForSingleObject(h, INFINITE); }
#else
/// unix specific functions
WRes HandleThread_Close(pthread_t* th) {
free(th);
th = NULL;
return 0;
}
WRes HandleThread_Join(pthread_t* th) {
return pthread_join(*th, NULL);
}
#endif
#ifdef _WIN32
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
{
/* Windows Me/98/95: threadId parameter may not be NULL in _beginthreadex/CreateThread functions */
#ifdef UNDER_CE
DWORD threadId;
*p = CreateThread(0, 0, func, param, 0, &threadId);
#else
unsigned threadId;
*p = (HANDLE)_beginthreadex(NULL, 0, func, param, 0, &threadId);
#endif
/* maybe we must use errno here, but probably GetLastError() is also OK. */
return HandleToWRes(*p);
}
#else
pthread_attr_t g_th_attrs[64]; //NOTE: maximum of 64 threads
size_t g_th_index = 0;
WRes Thread_Create(CThread *p, THREAD_FUNC_TYPE func, LPVOID param)
{
*p = malloc(sizeof(pthread_t));
pthread_t* th = *p;
int ret = pthread_attr_init(&(g_th_attrs[g_th_index]));
assert(ret==0);
ret = pthread_create(th, &(g_th_attrs[g_th_index]), func, param);
g_th_index++;
return ret;
}
#endif
#ifdef _WIN32
static WRes Event_Create(CEvent *p, BOOL manualReset, int signaled)
{
*p = CreateEvent(NULL, manualReset, (signaled ? TRUE : FALSE), NULL);
return HandleToWRes(*p);
}
WRes Event_Set(CEvent *p) { return BOOLToWRes(SetEvent(*p)); }
WRes Event_Reset(CEvent *p) { return BOOLToWRes(ResetEvent(*p)); }
WRes ManualResetEvent_Create(CManualResetEvent *p, int signaled) { return Event_Create(p, TRUE, signaled); }
WRes AutoResetEvent_Create(CAutoResetEvent *p, int signaled) { return Event_Create(p, FALSE, signaled); }
WRes ManualResetEvent_CreateNotSignaled(CManualResetEvent *p) { return ManualResetEvent_Create(p, 0); }
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) { return AutoResetEvent_Create(p, 0); }
#else
///unix
WRes Event_Close(CEvent* p) {
if (!p || !(*p))
return 0;
event_t* evt = *p;
pthread_cond_destroy(&evt->cond);
pthread_mutex_destroy(&evt->mutex);
free(evt);
*p = NULL;
}
WRes Event_Set(CEvent *p) {
event_t* evt = *p;
if (pthread_mutex_lock(&evt->mutex) != 0) {
return 1;
}
evt->state = true;
if (evt->manual_reset) {
if (pthread_cond_broadcast(&evt->cond)) {
return 1;
}
} else {
if (pthread_cond_signal(&evn->cond)) {
return 1;
}
}
if (pthread_mutex_unlock(&evt->mutex)) {
return 1;
}
return 0;
}
WRes Event_Reset(CEvent* p) {
event_t* evt = *p;
if (pthread_mutex_lock(&evt->mutex)) {
return 1;
}
evt->state = false;
if (pthread_mutex_unlock(&evt->mutex)) {
return 1;
}
return 0;
}
WRes Event_Wait(CEvent* p) {
event_t* evt = *p;
if (pthread_mutex_lock(&evt->mutex)) {
return 1;
}
while (!evt->state) {
if (pthread_cond_wait(&evt->cond, &evt->mutex)) {
pthread_mutex_unlock(&evt->mutex);
return 1;
}
}
evt->state = false;
if (pthread_mutex_unlock(&evt->mutex)) {
return 1;
}
return 0;
}
WRes AutoResetEvent_CreateNotSignaled(CAutoResetEvent *p) {
*p = malloc(sizeof(event_t));
memset(evt, 0, sizeof(event_t));
evt->state = false;
evt->manual_reset = false;
if (pthread_mutex_init(&evt->mutex, NULL)) {
return 1;
}
if (pthread_cond_init(&evt->cond, NULL)) {
return 1;
}
return 0;
}
#endif
#ifdef _WIN32
WRes Semaphore_Create(CSemaphore *p, UInt32 initCount, UInt32 maxCount)
{
*p = CreateSemaphore(NULL, (LONG)initCount, (LONG)maxCount, NULL);
return HandleToWRes(*p);
}
static WRes Semaphore_Release(CSemaphore *p, LONG releaseCount, LONG *previousCount)
{ return BOOLToWRes(ReleaseSemaphore(*p, releaseCount, previousCount)); }
WRes Semaphore_ReleaseN(CSemaphore *p, UInt32 num)
{ return Semaphore_Release(p, (LONG)num, NULL); }
WRes Semaphore_Release1(CSemaphore *p) { return Semaphore_ReleaseN(p, 1); }
#endif
#ifdef _WIN32
WRes CriticalSection_Init(CCriticalSection *p)
{
/* InitializeCriticalSection can raise only STATUS_NO_MEMORY exception */
#ifdef _MSC_VER
__try
#endif
{
InitializeCriticalSection(p);
/* InitializeCriticalSectionAndSpinCount(p, 0); */
}
#ifdef _MSC_VER
__except (EXCEPTION_EXECUTE_HANDLER) { return 1; }
#endif
return 0;
}
#else
WRes CriticalSection_Init(CCriticalSection *p) {
*p = malloc(sizeof(pthread_mutex_t));
if (pthread_mutex_init(*p, NULL)) {
return 1;
}
return 0;
}
WRes CriticalSection_Delete(CCriticalSection *p) {
pthread_mutex_t* mtx = *p;
return pthread_mutex_destroy(mtx);
}
WRes CriticalSection_Enter(CCriticalSection *p) {
if (pthread_mutex_lock(&evt->mutex)) {
return 1;
}
return 0;
}
WRes CriticalSection_Leave(CCriticalSection *p) {
if (pthread_mutex_unlock(&evt->mutex)) {
return 1;
}
return 0;
}
#endif
XzDec.c
add the following macro
#ifndef _WIN32
#define S_OK 0x00000000
#define E_FAIL 0x80004005
#endif
MtDec.c
replace function ThreadFUnc1
with:
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void* pp)
{
static int g_ok_stat = 0x0;
static int g_err_stat = 0x80004005;
WRes res;
CMtDecThread *t = (CMtDecThread*)pp;
CMtDec *p;
res = ThreadFunc2(t);
p = t->mtDec;
if (res == 0) {
#ifdef _WIN32
return p->exitThreadWRes;
#else
if (p->exitThreadWRes) { return &g_err_stat; }
else { return &g_ok_stat; }
#endif
}
{
// it's unexpected situation for some threading function error
if (p->exitThreadWRes == 0)
p->exitThreadWRes = res;
PRF(printf("\nthread exit error = %d\n", res));
p->exitThread = True;
Event_Set(&p->threads[0].canRead);
Event_Set(&p->threads[0].canWrite);
MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
}
#ifdef _WIN32
return res;
#else
return &g_err_stat;
#endif
}