基于python的循环队列

有时可能需要一个循环队列实时保存数据,比如使用循环队列实时保存一段视频数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#coding=utf-8
import cv2
import datetime
import os
import time
import sqlite3
import threading
import pytz
import json
import logging


class CircleQueue(object):
"""环形队列"""

def __init__(self, size=90):
self.queue = [0 for i in range(size)]
self.size = size
self.rear = 0 # 队尾指针
self.front = 0 # 队首指针
self.lock = False
self.boat_in_write = None
self.boat_in = False

def enqueue(self, item):
"""进队"""
if not self.lock:
if not self.is_full():
self.rear = (self.rear + 1) % self.size # 队尾指针前移
self.queue[self.rear] = item
# print("inset item: ", self.queue)
else:
# 队列已满,前面入队的出队
self.front = (self.front + 1) % self.size # 队首指针前移
self.rear = (self.rear + 1) % self.size # 队尾指针前移
self.queue[self.rear] = item
if self.boat_in:
print("save boat in")
self.save_queue(self.boat_in_write)
self.boat_in = False

def dequeue(self):
"""出队"""
if not self.is_empty():
self.front = (self.front + 1) % self.size # 队首指针前移
return self.queue[self.front]
else:
print("队列为空")

def is_empty(self):
"""判断环形队列是否为空"""
return self.front == self.rear

def is_full(self):
"""判断环形队列是否已满"""
return (self.rear + 1) % self.size == self.front


def save_queue(self, out_write=None):
print("开始保存视频")
if not self.is_empty():
print("数组非空")
self.lock = True
if out_write is not None:
print("write is not none")
temp_front = self.front
while temp_front != self.rear:
out_write.write(self.queue[temp_front])
temp_front = (temp_front + 1) % self.size # 前移
out_write.release()
self.lock = False

def save_queue_wait(self, out_write=None):
# 清空数组
self.front = self.rear
self.boat_in = True
self.boat_in_write = out_write
# while True:
# # 如果已经跑了一圈
# if self.is_full:
# self.save_queue(out_write)
# break

def get_mean(self):
return round(sum(self.queue) / self.size)

def test_queue(self):
print(self.queue)



record = CircleQueue(600)
cam = cv2.VideoCapture('rtsp://')
fps = cam.get(cv2.CAP_PROP_FPS)
size = (int(cam.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cam.get(cv2.CAP_PROP_FRAME_HEIGHT)))
size2 = (640, 320)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
dir_save_path = "./video_out/"


def add_record():
i = 0
while True:
ret, frame = cam.read()
if i % 5 == 0 and ret:
frame = cv2.resize(frame, size2)
record.enqueue(frame)
i = i + 1



def boat(ai_name='yolov3_darknet_boat', cam_name='乌弄龙坝上放流'):
# 连接配置表查询摄像头id及算法id
try:
cur_iec = sqlite3.connect(r"easyedge-iec.db")
ai_cur = cur_iec.execute("SELECT uuid FROM aiservice where name='{}'".format(ai_name))
ai_id = ai_cur.fetchall()[0][0]
cam_cur = cur_iec.execute("SELECT uuid FROM camera where name='{}'".format(cam_name))
cam_id = cam_cur.fetchall()[0][0]
ai_cur.close()
cam_cur.close()
cur_iec.close()
print(ai_id, cam_id, type(cam_id), str(ai_id))
except:
time.sleep(40)
boat(ai_name=ai_name, cam_name=cam_name)

last_date = datetime.datetime.now()
print("now time type:", last_date, type(last_date))
time.sleep(30)
# 根据算法id和摄像头id及时间查询数据
cur_event = sqlite3.connect(r"easyedge-event.db")
last_boat_num = None
while True:
cur_time = datetime.datetime.now()
boat_cur = cur_event.execute("SELECT eventtime, result FROM event_record where serviceUUID='{}' and cameraUUID='{}' and eventTime>'{}'".format(ai_id, cam_id, last_date))
boat_result = boat_cur.fetchall()
# print("boat_result: {}".format(boat_result))

if len(boat_result) > 0:
fps_ai = 1
# 计算帧数
frame_num = (cur_time - last_date).seconds * fps_ai
print("总帧数", frame_num)
# 统计这段时间内船舶数量的平均数
boat_num_sum = 0
for i in range(len(boat_result)):
_, bbox = boat_result[i]
bbox = bbox.decode('utf-8')
bbox = json.loads(bbox)

cur_boat_num = len(bbox)
boat_num_sum += cur_boat_num
print("boat_num_sum: ", boat_num_sum)
boat_num_mean = round(boat_num_sum / frame_num)
print("船舶平均数", boat_num_mean)



else:
boat_num_mean = 0
last_date = cur_time

# 初始化
if last_boat_num is None:
print("设置初始boat数量")
# last_boat_num = boat_num_mean
# 出港
elif last_boat_num > boat_num_mean:
output_path = os.path.join(dir_save_path, datetime.datetime.strftime(datetime.datetime.now(), '%Y_%m_%d_%H_%M_%S') + "out.mp4")
out = cv2.VideoWriter(output_path, fourcc, fps, size2)
record.save_queue(out)
print("有出港事件发生")
# 进港
elif last_boat_num < boat_num_mean:
output_path = os.path.join(dir_save_path, datetime.datetime.strftime(datetime.datetime.now(),'%Y_%m_%d_%H_%M_%S') + "in.mp4")
out = cv2.VideoWriter(output_path, fourcc, fps, size2)
record.save_queue_wait(out)
print("有进港事件发生")
else:
print("无进出港口事件发生,当前船数量:", boat_num_mean)

# 设置间隔时间
last_boat_num = boat_num_mean
time.sleep(30)


if __name__ == '__main__':
eventprocess = threading.Thread(target=boat)
videoprocess = threading.Thread(target=add_record)
eventprocess.start()
videoprocess.start()