●tagmgr模块(tagmgr.py)
类:
TagMgr
方法:
❖ 新增tag
❖ 获取Tag对象
❖ 批量新增普通点
❖ 批量新增统计点
❖ 批量新增计算点
❖ 批量新增报警点
●举例
import unittest
from server import *
from tagmgr import *
from tag import *
from record import *
from datetime import time
class TagMgrTestCase(unittest.TestCase):
def setUp(self):
self.server = Server('168.2.237.97', 5678)
#self.server = Server('127.0.0.1', 5678)
self.server.connect(2)
self.server.login('admin', 'admin')
def tearDown(self):
self.server.disconnect()
def test_snapshots_opt(self):
tag_mgr = tagmgr.TagMgr(self.myServer.myServer)
try:
ret = tag_mgr.delete_tag('tagSn1')
ret = tag_mgr.delete_tag('tagSn2')
except :
print "delete tag 错误"
ret = tag_mgr.add_tag(tagname = 'tagSn1', tagtype = 'int16')
unittest.TestCase.assertEqual(self,RIGHT, ret)
ret = tag_mgr.add_tag(tagname = 'tagSn2', tagtype = 'int32')
unittest.TestCase.assertEqual(self,RIGHT, ret)
tag1 = tag_mgr.get_tag('tagSn1')
tag2 = tag_mgr.get_tag('tagSn2')
snapshots = []
hdRecord1 = record.Record(1000, 0)
hdRecord1.quality = 192
hdRecord1.tagtype = 'int16'
hdRecord1.value = '123'
hdRecord2 = record.Record(2000, 0)
hdRecord2.quality = 192
hdRecord2.tagtype = 'int32'
hdRecord2.value = '234'
dest_snapshot_list = []
dest_snapshot_list.append((tag1.tagid,hdRecord1))
dest_snapshot_list.append((tag2.tagid,hdRecord2))
tag_mgr.save_snapshots(dest_snapshot_list)
tagIDs = []
strTagTypes = []
tagIDs.append(tag1.tagid)
tagIDs.append(tag2.tagid)
strTagTypes.append('int16')
strTagTypes.append('int32')
result = tag_mgr.get_snapshots(tagIDs,strTagTypes)
num = 2
ret = result[0]
for i in range(count*2):
print result[1][i].value,' and rtn code is',result[2][i]
if(result[2][i] != hyperdb.hd_sucess):
ret = result[2][i]
unittest.TestCase.assertEqual(self,RIGHT, ret)
#测试批量插值数据接口
def test_ar_query_tags_interp_records_by_mode_query(self):
tag_mgr = tagmgr.TagMgr(self.myServer.myServer)
tag1 = tag_mgr.get_tag('hdsys_cpu')
tag2 = tag_mgr.get_tag('hdsys_mem')
tagIDs = []
strTagTypes = []
dt = datetime.now()
hdTime = hyperdb.HDTime()
hdTime.nSec = c_int32(int(time.mktime(dt.timetuple())))
hdTime.nMsec = c_int16(dt.microsecond/1000)
times = []
count = 2000
for i in range (count):
tagIDs.append(tag1.tagid)
strTagTypes.append('int8')
newTime = hyperdb.HDTime()
newTime.nSec = hdTime.nSec - i*5-100
times.append(newTime)
for i in range (count):
tagIDs.append(tag2.tagid)
strTagTypes.append('int8')
newTime = hyperdb.HDTime()
newTime.nSec = hdTime.nSec - i*5 -100
times.append(newTime)
result = tag_mgr.get_inter_records_multitags_by_mode(1, tagIDs,strTagTypes,times)
ret = result[0]
for i in range(count*2):
print result[1][i].value,' and rtn code is',result[2][i]
if(result[2][i] != hyperdb.hd_sucess):
ret = result[2][i]
unittest.TestCase.assertEqual(self,RIGHT, ret)