tagmgr模块

Navigation:  Python > TagMgr类管理 >

tagmgr模块

Previous pageReturn to chapter overviewNext page

 

tagmgr模块(tagmgr.py)

 

类:

TagMgr

 

方法:

新增tag

 

获取Tag对象

 

根据tagID获取tagType

 

删除指定tagname的tag点

 

按照查询条件查找满足条件的普通点的迭代器

 

批量新增普通点

 

批量修改普通点属性

 

批量新增统计点

 

批量修改统计点属性

 

批量新增计算点

 

批量修改计算点属性

 

批量新增报警点

 

批量修改报警点属性

 

按照查询条件查找满足条件的统计点的迭代器

 

按照查询条件查找满足条件的计算点的迭代器

 

按照查询条件查找满足条件报警点的迭代器

 

举例

 

import unittest

from server import *

from tagmgr import *

from tag import *

from record import *

from datetime import time

 

class TagMgrTestCase(unittest.TestCase):

  def setUp(self):

      self.server = Server('168.2.237.97', 5678)

      #self.server = Server('127.0.0.1', 5678)

      self.server.connect(2)

      self.server.login('admin', 'admin')

 

  def tearDown(self):

      self.server.disconnect()

 

 def test_snapshots_opt(self):

      tag_mgr = tagmgr.TagMgr(self.myServer.myServer)

      try:

          ret = tag_mgr.delete_tag('tagSn1')

          ret = tag_mgr.delete_tag('tagSn2')

      except :

          print "delete tag 错误"

      ret = tag_mgr.add_tag(tagname = 'tagSn1', tagtype = 'int16')

      unittest.TestCase.assertEqual(self,RIGHT, ret)

      ret = tag_mgr.add_tag(tagname = 'tagSn2', tagtype = 'int32')

      unittest.TestCase.assertEqual(self,RIGHT, ret)

 

      tag1 = tag_mgr.get_tag('tagSn1')

      tag2 = tag_mgr.get_tag('tagSn2')

      snapshots = []

      hdRecord1 = record.Record(1000, 0)

      hdRecord1.quality = 192

      hdRecord1.tagtype = 'int16'

      hdRecord1.value = '123'

      hdRecord2 = record.Record(2000, 0)

      hdRecord2.quality = 192

      hdRecord2.tagtype = 'int32'

      hdRecord2.value = '234'  

 

      dest_snapshot_list = []

      dest_snapshot_list.append((tag1.tagid,hdRecord1))

      dest_snapshot_list.append((tag2.tagid,hdRecord2))

      tag_mgr.save_snapshots(dest_snapshot_list)

 

      tagIDs = []

      strTagTypes = []

      tagIDs.append(tag1.tagid)

      tagIDs.append(tag2.tagid)

      strTagTypes.append('int16')

      strTagTypes.append('int32')

 

      result = tag_mgr.get_snapshots(tagIDs,strTagTypes)

      num = 2

      ret = result[0]

      for i in range(count*2):

           print result[1][i].value,' and rtn code is',result[2][i]

           if(result[2][i] != hyperdb.hd_sucess):

               ret = result[2][i]

               unittest.TestCase.assertEqual(self,RIGHT, ret)

 

  #测试批量插值数据接口

  def test_ar_query_tags_interp_records_by_mode_query(self):

      tag_mgr = tagmgr.TagMgr(self.myServer.myServer)

      tag1 = tag_mgr.get_tag('hdsys_cpu')

      tag2 = tag_mgr.get_tag('hdsys_mem')

      tagIDs = []

      strTagTypes = []

 

      dt  = datetime.now()

      hdTime = hyperdb.HDTime()

      hdTime.nSec = c_int32(int(time.mktime(dt.timetuple())))

      hdTime.nMsec = c_int16(dt.microsecond/1000)

      times = []

      count = 2000

      for i in range (count):

          tagIDs.append(tag1.tagid)

          strTagTypes.append('int8')

          newTime = hyperdb.HDTime()

          newTime.nSec = hdTime.nSec - i*5-100

          times.append(newTime)

      for i in range (count):

          tagIDs.append(tag2.tagid)

          strTagTypes.append('int8')

          newTime = hyperdb.HDTime()

          newTime.nSec = hdTime.nSec - i*5 -100

          times.append(newTime)

 

      result = tag_mgr.get_inter_records_multitags_by_mode(1, tagIDs,strTagTypes,times)

      ret = result[0]

      for i in range(count*2):

           print result[1][i].value,' and rtn code is',result[2][i]

           if(result[2][i] != hyperdb.hd_sucess):

               ret = result[2][i]

               unittest.TestCase.assertEqual(self,RIGHT, ret)