add queue 1.0

pull/1/head
deffpuzzl 2018-06-04 20:54:00 +08:00
parent 6808328888
commit 81c7db876e
20 changed files with 153 additions and 21 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
demo/drop

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,5 +1,6 @@
INCDIR= -I/usr/include -I$(HOME)/include -I./ -I./include -I../include
LIBDIR= -L$(HOME)/lib -L../lib -lstvm -lm -lc -ldl -lpthread
#LIBDIR= -L$(HOME)/lib -L../lib -lm -lc -ldl -lpthread -static -lstvm
LIBDIR= -L$(HOME)/lib -L../lib -lstvm -lm -lc -ldl -lpthread
CC=cc -fPIC -g
CO=-c -pg
OUTLIB=../lib
@ -7,6 +8,9 @@ OUTBIN=../bin
OBJFILE=tree.o sem.o msg.o tcp.o str.o list.o conf.o
CREATE=create
QUEUE=queue
PUSH=push
POP=pop
INSERT=insert
SELECT=select
QUERY=query
@ -21,9 +25,13 @@ CLICK=click
REPLACE=replace
PRESSURE=press_demo
all: $(CREATE) $(INSERT) $(SELECT) $(QUERY) $(DELETE) $(UPDATE) $(COUNT) $(GROUP) $(EXTREME) $(TRUNCATE) $(DROP) $(PRESSURE) $(CLICK) $(REPLACE) clean
all: $(CREATE) $(INSERT) $(QUEUE) $(PUSH) $(POP) $(SELECT) $(QUERY) $(DELETE) $(UPDATE) $(COUNT) $(GROUP) $(EXTREME) $(TRUNCATE) $(DROP) $(PRESSURE) $(CLICK) $(REPLACE) clean
$(CREATE): create.o
$(CC) -o $@ $< $(LIBDIR)
$(PUSH): push.o
$(CC) -o $@ $< $(LIBDIR)
$(POP): pop.o
$(CC) -o $@ $< $(LIBDIR)
$(INSERT): insert.o
$(CC) -o $@ $< $(LIBDIR)
$(SELECT): select.o
@ -48,6 +56,8 @@ $(CLICK): click.o
$(CC) -o $@ $< $(LIBDIR)
$(REPLACE): replace.o
$(CC) -o $@ $< $(LIBDIR)
$(QUEUE): queue.o
$(CC) -o $@ $< $(LIBDIR)
$(PRESSURE): press_demo.o
$(CC) -o $@ $< $(LIBDIR)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -54,6 +54,7 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/inotify.h>
#include <linux/futex.h>
#include <strings.h>
#include <iconv.h>
#include <dlfcn.h>

View File

@ -25,6 +25,7 @@
typedef pthread_rwlock_t RWLock;
typedef pthread_rwlockattr_t RWAttr;
typedef struct timespec Timesp;
typedef unsigned int TABLE;
typedef long long llSEQ;
typedef long (*TCREATE)(TABLE t);
@ -154,6 +155,7 @@ typedef long CREATE;
#define TYPE_INCORE 0x02
#define TYPE_CLIENT 0x03 // custom
#define TYPE_KEYVAL 0x04
#define TYPE_MQUEUE 0x05 // custom
#define TVM_NODE_INFO "localhost"
#define TVM_RUNCFG_TAG "\x01\x33\xC8\x48"
@ -189,10 +191,13 @@ typedef long CREATE;
#define IS_TRUCK_NRML(p) ((p)->m_chTag == DATA_TRUCK_NRML)
#define IS_TRUCK_LOCK(p) ((p)->m_chTag == DATA_TRUCK_LOCK)
#define SET_DATA_TRUCK(p, type) ((p)->m_chTag = type)
#define TFree(p) if(p) { free(p); p = NULL; }
#define TFgrp(p) do{vDeleteRowgrp(p);p = NULL;}while(0);
#define TFlst(p) do{vDestroyList(p);p = NULL;}while(0);
#define TClose(f) if(f) { fclose(f); f = NULL; }
#define TFree(p) if(p) { free(p); p = NULL; }
#define TFgrp(p) do{vDeleteRowgrp(p);p = NULL;}while(0);
#define TFlst(p) do{vDestroyList(p);p = NULL;}while(0);
#define TClose(f) if(f) { fclose(f); f = NULL; }
#define Futex(a,o,v,t) syscall(SYS_futex, a, o, v, t, NULL, 0)
#define Tremohold(p,r) if(p->m_bHold) r->m_lState = RESOURCE_ABLE;
/*************************************************************************************************
@ -294,6 +299,10 @@ typedef long CREATE;
#define EXTRE_SET_ERR 94 // extreme set decorate error
#define GROUP_SET_ERR 95 // group set decorate error
#define CMM_TABLE_MIS 96 // the table of field is missing
#define MQUE_WAIT_TMO 97 // queue waiting for timeout
#define MQUE_WAIT_ERR 98 // queue waiting for failure
#define MQUE_CRTE_BIG 99 // created queue is too big
#define NOT_SUPPT_OPT 100 // table does not support this operation
/*************************************************************************************************
@ -316,6 +325,7 @@ typedef long CREATE;
return RC_FAIL;
#define FINISH return RC_SUCC;
#define lCreateQueue(p,t,r,s,n) lCircleQueue(p, t, r, s, #t, n)
/*************************************************************************************************
Field assignment
*************************************************************************************************/
@ -497,7 +507,7 @@ typedef struct __SQL_FIELD
typedef struct __SYS_TVM_INDEX
{
TABLE m_table; // table
long m_lType; // table type
uint m_lType; // table type
char m_szTable[MAX_FIELD_LEN]; // table name
char m_szPart[MAX_FIELD_LEN]; // partition name
char m_szOwner[MAX_FIELD_LEN]; // owner
@ -588,6 +598,7 @@ typedef struct __TVM_RUNTIME
void *pstVoid;
uint m_lState;
uint m_lLocal;
uint m_lType;
long m_shmID; // Memory Key
long m_semID; // semaphore key
long m_lRowSize; // Record block size
@ -725,7 +736,7 @@ extern long lMakeConfig(char *pszFile);
extern long lGetQueueNum(SATvm *pstSavm, long lQid);
extern long lQueueMaxByte(SATvm *pstSavm, long lQid);
extern long lQueueRcvTime(SATvm *pstSavm, long lQid);
extern long lCreateQueue(SATvm *pstSavm, bool bCreate);
extern long lCreateQuemsg(SATvm *pstSavm, bool bCreate);
extern long lOperateSems(SATvm *pstSavm, long semID, long lSems, Benum evp);
extern long lEventWrite(SATvm *pstSavm, long lQid, void *psvData, long lSize);
extern long lCreateSems(SATvm *pstSavm, RunTime *pstRun, long lSems, long lValue);
@ -779,6 +790,7 @@ extern long lSelectSeque(SATvm *pstSavm, char *pszSQName, ulong *pulNumbe
extern long lSetSequence(SATvm *pstSavm, char *pszSQName, ulong uStart);
extern long lCustomTable(SATvm *pstSavm, TABLE t, size_t lRow, TblDef *pstDef);
extern long lCreateTable(SATvm *pstSavm, TABLE t, size_t lRow, TCREATE pfCreateFunc);
extern long lCircleQueue(SATvm *pstSavm, TABLE t, size_t lRow, size_t lSize, char *p, char *n);
extern long lInsertTrans(SATvm *pstSavm, size_t *plOffset, llSEQ *pllSeq);
@ -794,6 +806,11 @@ extern long lExtreme(SATvm *pstSavm, void *psvOut);
extern long lGroup(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
extern long lQuery(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
extern long lPops(SATvm *pstSavm, size_t lExpect, Timesp *tm, size_t *plOut, void **ppsvOut);
extern long lPop(SATvm *pstSavm, void *pvOut);
extern long lPushs(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
extern long lPush(SATvm *pstSavm);
extern long lTableDeclare(SATvm *pstSavm);
extern long lTableFetch(SATvm *pstSavm, void *psvOut);
extern long lNextFetch(SATvm *pstSavm, void **ppvOAddr);

View File

@ -104,7 +104,8 @@ void vDebugTable(TABLE t, long eType)
fprintf(stdout, "TABLE:%9u, extern:%10ld, NAME:%s\t\nSHTree:%8ld, SHList:%10ld, "
"TblDef:%11ld\nGroup:%9ld, MaxRow:%10ld, Valid:%12ld\nlNodeNil:%6ld, lIType:%10d, "
"Table:%12ld\nIdxLen:%8ld, TreePos:%9ld, TreeRoot:%9ld\nGrpLen:%8ld, GroupPos:%8ld, "
"GroupRoot:%8ld\nData:%10ld, ReSize:%10ld, Truck:%12ld\nListPos:%7ld, ListOfs:%9ld\n",
"GroupRoot:%8ld\nData:%10ld, ReSize:%10ld, Truck:%12ld\nListPos:%7ld, ListOfs:%9ld, "
"ExSeQ:%12ld\n",
((TblDef *)pGetTblDef(t))->m_table, ((TblDef *)pGetTblDef(t))->m_lExtern,
((TblDef *)pGetTblDef(t))->m_szTable, sizeof(SHTree), sizeof(SHList), sizeof(TblDef),
((TblDef *)pGetTblDef(t))->m_lGroup, ((TblDef *)pGetTblDef(t))->m_lMaxRow,
@ -115,7 +116,7 @@ void vDebugTable(TABLE t, long eType)
((TblDef *)pGetTblDef(t))->m_lGroupPos, ((TblDef *)pGetTblDef(t))->m_lGroupRoot,
((TblDef *)pGetTblDef(t))->m_lData, ((TblDef *)pGetTblDef(t))->m_lReSize,
((TblDef *)pGetTblDef(t))->m_lTruck, ((TblDef *)pGetTblDef(t))->m_lListPos,
((TblDef *)pGetTblDef(t))->m_lListOfs);
((TblDef *)pGetTblDef(t))->m_lListOfs, ((TblDef *)pGetTblDef(t))->m_lExSeQ);
fprintf(stdout, "--------------------------------------------------------------------"
"----------\n");

View File

@ -13,7 +13,7 @@ OUTBIN=../bin
OBJFILE=tree.o sem.o msg.o tcp.o str.o list.o conf.o queue.o
TARGET=$(OUTLIB)/libstvm.a
TARDLL=$(OUTLIB)/libstvm.so
#TARDLL=$(OUTLIB)/libstvm.so
TARVER=$(OUTLIB)/libstvm.so.1.2
STVM=$(OUTBIN)/stvm
DETVM=$(OUTBIN)/detvm

View File

@ -33,7 +33,7 @@
RC_SUCC --success
RC_FAIL --failure
*************************************************************************************************/
long lCreateQueue(SATvm *pstSavm, bool bCreate)
long lCreateQuemsg(SATvm *pstSavm, bool bCreate)
{
long lQid;

View File

@ -3888,7 +3888,7 @@ long lExecuteSQL(SATvm *pstSavm, char *pszSQL)
sGetTError(pstSavm->m_lErrno));
return RC_SUCC;
}
else if(!strcasecmp(pszSQL, "show tables"))
else if(!strcasecmp(pszSQL, "show table"))
return lShowTables(pstSavm);
else if(!strcasecmp(pszSQL, "show info"))
{

View File

@ -43,12 +43,11 @@ extern long _lRenameTableByRt(SATvm *pstSavm, TABLE to, TABLE tn);
/*************************************************************************************************
macro
*************************************************************************************************/
#define Tremohold(p,r) if(p->m_bHold) r->m_lState = RESOURCE_ABLE;
/*************************************************************************************************
Error message definition
*************************************************************************************************/
static char tvmerr[100][MAX_INDEX_LEN] = {
static char tvmerr[128][MAX_INDEX_LEN] = {
"completed successfully",
"sever exception",
"index field values is null",
@ -146,6 +145,10 @@ static char tvmerr[100][MAX_INDEX_LEN] = {
"extreme set decorate error",
"group set decorate error",
"the table of field is missing",
"queue waiting for timeout",
"queue waiting for failure",
"created queue is too big",
"table does not support this operation",
"",
};
@ -1099,7 +1102,11 @@ void vHoldRelease(SATvm *pstSavm)
TFree(pstRun->pstVoid);
if(pstRun->m_pvAddr)
{
if(TYPE_MQUEUE == pstRun->m_lType && ((TblDef *)pstRun->m_pvAddr)->m_lGroup > 0)
((TblDef *)pstRun->m_pvAddr)->m_lGroup --; // process exit
shmdt(pstRun->m_pvAddr);
}
pstRun->m_pvAddr = NULL;
pstRun->m_bAttch = false;
}
@ -1133,7 +1140,11 @@ void _vTblRelease(SATvm *pstSavm, TABLE t, bool bHold)
pstRun->m_pvCurAddr = NULL;
if(pstRun->m_pvAddr)
{
if(TYPE_MQUEUE == pstRun->m_lType && ((TblDef *)pstRun->m_pvAddr)->m_lGroup > 0)
((TblDef *)pstRun->m_pvAddr)->m_lGroup --; // process exit
shmdt(pstRun->m_pvAddr);
}
pstRun->m_pvAddr = NULL;
pstRun->m_bAttch = false;
}
@ -1575,7 +1586,9 @@ long lInitSATvm(SATvm *pstSavm, TABLE t)
pstRun->m_shmID = stIndex.m_shmID;
pstRun->m_semID = stIndex.m_semID;
pstRun->m_lLocal = stIndex.m_lLocal;
pstRun->m_lType = stIndex.m_lType;
pstRun->m_lRowSize = stIndex.m_lRowSize;
return RC_SUCC;
}
@ -1835,7 +1848,8 @@ void* pInitMemTable(SATvm *pstSavm, TABLE t)
}
pstRun->m_bAttch = true;
if(TYPE_MQUEUE == pstRun->m_lType)
((TblDef *)pstRun->m_pvAddr)->m_lGroup ++; // process join
memcpy((void *)pGetTblDef(t), pstRun->m_pvAddr, sizeof(TblDef));
if(pstSavm->lSize != lGetRowSize(t))
@ -8044,7 +8058,8 @@ long lRegisterTable(SATvm *pstSavm, RunTime *pstRun, TABLE t, long lType)
TIndex stIndex;
TBoot *pstBoot = (TBoot *)pBootInitial();
if(TYPE_CLIENT != lType) return RC_SUCC;
if(TYPE_SYSTEM == lType || TYPE_INCORE == lType)
return RC_SUCC;
if(RC_SUCC != lInitSATvm(pstSavm, SYS_TVM_INDEX))
return RC_FAIL;
@ -8134,6 +8149,67 @@ long _lCustomTable(SATvm *pstSavm, TABLE t, size_t lRow, bool bCreate, long l
return RC_SUCC;
}
/*************************************************************************************************
descriptioncreate queue
parameters:
pstSavm --stvm handle
t --table
lRow --table maxrows
bCreate --create type
lType --table type
return:
RC_SUCC --success
RC_FAIL --failure
*************************************************************************************************/
long _lCreateQueue(SATvm *pstSavm, TABLE t, size_t lRow, size_t lSize, char *pszTable,
char *pszNode, bool bCover)
{
RWAttr attr;
RunTime *pstRun = NULL;
RWLock *prwLock = NULL;
if(!pstSavm || lRow <= 0)
{
pstSavm->m_lErrno = CONDIT_IS_NIL;
return RC_FAIL;
}
if((lRow >> (sizeof(int) * 8 - 1)) > 0)
{
pstSavm->m_lErrno = MQUE_CRTE_BIG;
return RC_FAIL;
}
vInitTblDef(t);
pstSavm->tblName = t;
((TblDef *)pGetTblDef(t))->m_lIType = bCover;
((TblDef *)pGetTblDef(t))->m_table = t;
((TblDef *)pGetTblDef(t))->m_lReSize = lSize;
((TblDef *)pGetTblDef(t))->m_lTruck = lSize + sizeof(SHTruck);
strncpy(((TblDef *)pGetTblDef(t))->m_szPart, pszNode, MAX_FIELD_LEN);
strncpy(((TblDef *)pGetTblDef(t))->m_szTable, pszTable, MAX_FIELD_LEN);
((TblDef *)pGetTblDef(t))->m_lTable = lInitialTable(t, lRow);
if(NULL == (pstRun = (RunTime *)pCreateBlock(pstSavm, t, ((TblDef *)pGetTblDef(t))->m_lTable,
false)))
return RC_FAIL;
memcpy(pstRun->m_pvAddr, (void *)pGetTblDef(t), sizeof(TblDef));
prwLock = (RWLock *)pGetRWLock(pstRun->m_pvAddr);
pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_rwlock_init(prwLock, &attr);
memset(pstRun->m_pvAddr + lGetTblData(t), 0, lGetTableSize(t) - lGetTblData(t));
vTblDisconnect(pstSavm, t);
if(RC_SUCC != lRegisterTable(pstSavm, pstRun, t, TYPE_MQUEUE))
{
shmctl(pstRun->m_shmID, IPC_RMID, NULL);
return RC_FAIL;
}
return RC_SUCC;
}
/*************************************************************************************************
descriptioncreate table
parameters:
@ -8660,6 +8736,22 @@ long lCustomTable(SATvm *pstSavm, TABLE t, size_t lRow, TblDef *pstDef)
return _lCustomTable(pstSavm, t, lRow, false, TYPE_CLIENT);
}
/*************************************************************************************************
descriptionAPI - CreateQueue
parameters:
pstSavm --stvm handle
t --table
lRow --table maxrows
pfCreateFunc --table field define
return:
RC_SUCC --success
RC_FAIL --failure
*************************************************************************************************/
long lCircleQueue(SATvm *pstSavm, TABLE t, size_t lRow, size_t lSize, char *pszTable, char *node)
{
return _lCreateQueue(pstSavm, t, lRow, lSize, pszTable, node, false);
}
/*************************************************************************************************
descriptionAPI - lDropTable
parameters:
@ -8679,7 +8771,6 @@ long lDropTable(SATvm *pstSavm, TABLE t)
pstSavm->bSearch = TYPE_SYSTEM;
conditinit(pstSavm, stIndex, SYS_TVM_INDEX)
conditnum(pstSavm, stIndex, m_table, t)
conditnum(pstSavm, stIndex, m_lType, TYPE_CLIENT)
if(RC_SUCC != lSelect(pstSavm, (void *)&stIndex))
return RC_FAIL;
@ -8688,7 +8779,6 @@ long lDropTable(SATvm *pstSavm, TABLE t)
{
conditinit(pstSavm, stIndex, SYS_TVM_INDEX)
conditnum(pstSavm, stIndex, m_table, t)
conditnum(pstSavm, stIndex, m_lType, TYPE_CLIENT)
if(RC_SUCC != lDelete(pstSavm))
return RC_FAIL;
@ -8713,9 +8803,15 @@ long lDropTable(SATvm *pstSavm, TABLE t)
conditinit(pstSavm, stIndex, SYS_TVM_INDEX)
conditnum(pstSavm, stIndex, m_table, t)
conditnum(pstSavm, stIndex, m_lType, TYPE_CLIENT)
if(RC_SUCC != lDelete(pstSavm)) return RC_FAIL;
if(TYPE_MQUEUE == pstRun->m_lType)
{
memset(pstRun, 0, sizeof(RunTime));
pstSavm->m_lEffect = 1;
return RC_SUCC;
}
// Delete the field table
if(RC_SUCC != lInitSATvm(pstSavm, SYS_TVM_FIELD))
return RC_FAIL;
@ -9132,7 +9228,14 @@ long lGetTblField(TABLE t, size_t *plOut, TField **ppstField)
conditinit(pstSavm, stField, SYS_TVM_FIELD)
conditnum(pstSavm, stField, m_table, t)
return lQuery(pstSavm, plOut, (void **)ppstField);
if(RC_SUCC != lQuery(pstSavm, plOut, (void **)ppstField))
{
if(NO_DATA_FOUND == pstSavm->m_lErrno)
pstSavm->m_lErrno = FIELD_NOT_DEF;
return RC_FAIL;
}
return RC_SUCC;
}
/*************************************************************************************************